package customgrouping;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;

public class NumberTopology {
    public static void main(String[] args) throws Exception {
        NumberSpout numberSpout = new NumberSpout();
        NumberBolt numberBolt1 = new NumberBolt();
        LessThanBolt lessThanBolt2 = new LessThanBolt();
        MoreThanBolt moreThanBolt2 = new MoreThanBolt();

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("numberSpout", numberSpout);
        topologyBuilder.setBolt("numberBolt1", numberBolt1).shuffleGrouping("numberSpout");
        // 自定义流分组
        // 参数1 上游组件id
        // 参数2 自定义流的id
        topologyBuilder.setBolt("lessThanBolt2", lessThanBolt2).globalGrouping("numberBolt1", "lessThan");
        topologyBuilder.setBolt("moreThanBolt2", moreThanBolt2).globalGrouping("numberBolt1", "moreThan");

        StormTopology topology = topologyBuilder.createTopology();

        LocalCluster localCluster = new LocalCluster();
        Config config = new Config();
        localCluster.submitTopology("numberTopology", config, topology);
    }
}
